package apiTest;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Delete;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class HBaseTest {

	private static Logger hbaseLogger = Logger.getLogger("org.apache");
	private static Logger logger = Logger.getLogger(ReadFromHbase.class);
	
	// 声明静态配置

	static Configuration conf = null;

	static final HTablePool tablePool;

	static {

		conf = HBaseConfiguration.create();

		conf.set("hbase.zookeeper.quorum", "192.168.12.58");

		tablePool = new HTablePool(conf, 15);

	}

	/*
	 * 
	 * 创建表
	 * 
	 * @tableName 表名
	 * 
	 * @family 列族数组
	 * 
	 */

	public static void creatTable(String tableName, String[] family)

			throws Exception {

		HBaseAdmin admin = new HBaseAdmin(conf);

		HTableDescriptor desc = new HTableDescriptor(tableName);

		for (int i = 0; i < family.length; i++) {

			desc.addFamily(new HColumnDescriptor(family[i]));

		}

		if (admin.tableExists(tableName)) {

			System.out.println("table Exists!");

			System.exit(0);

		} else {

			admin.createTable(desc);

			System.out.println("create table Success!");

		}

	}

	/*
	 * 
	 * 表添加数据
	 * 
	 * @rowKey rowKey
	 * 
	 * @tableName 表名
	 * 
	 * @column1 第一个列族数组 realname
	 * 
	 * @value1 第一个列的值的数组
	 * 
	 * @column2 第二个列族数组 address
	 * 
	 * @value2 第二个列的值的数组
	 * 
	 */

	public static void addTableData(String rowKey, String tableName, String[] column1, String[] value1,
			String[] column2, String[] value2)

			throws IOException {

		Put put = new Put(Bytes.toBytes(rowKey));

		HTable table = (HTable) tablePool.getTable(tableName);

		HColumnDescriptor[] columnFamilies = table.getTableDescriptor()

				.getColumnFamilies();

		for (int i = 0; i < columnFamilies.length; i++) {

			String familyName = columnFamilies[i].getNameAsString();

			if (familyName.equals("realname")) {

				for (int j = 0; j < column1.length; j++) {

					put.add(Bytes.toBytes(familyName), Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));

				}

			}

			if (familyName.equals("address")) {

				for (int j = 0; j < column2.length; j++) {

					put.add(Bytes.toBytes(familyName), Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));

				}

			}

		}

		table.put(put);

	}

	/*
	 * 
	 * 更新表中的某一列
	 * 
	 * @tableName 表名
	 * 
	 * @rowKey rowKey
	 * 
	 * @familyName 列族名
	 * 
	 * @columnName 列名
	 * 
	 * @value 更新后的值
	 * 
	 */

	public static void updateTable(String tableName, String rowKey,

			String familyName, String columnName, String value)

			throws IOException {

		HTable table = (HTable) tablePool.getTable(tableName);

		Put put = new Put(Bytes.toBytes(rowKey));

		put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value));

		table.put(put);

		System.out.println("update table Success!");

	}

	/*
	 * 
	 * 根据rowkey查询
	 * 
	 * @rowKey rowKey
	 * 
	 * @tableName 表名
	 * 
	 */

	public static Result getResult(String tableName, String rowKey)

			throws IOException {

		Get get = new Get(Bytes.toBytes(rowKey));

		HTable table = (HTable) tablePool.getTable(tableName);

		Result result = table.get(get);

		for (KeyValue kv : result.list()) {

			System.out.println("family==>" + Bytes.toString(kv.getFamily()));

			System.out.println("qualifier==>" + Bytes.toString(kv.getQualifier()));

			System.out.println("value==>" + Bytes.toString(kv.getValue()));

			System.out.println("Timestamp==>" + kv.getTimestamp());

		}

		return result;

	}

	/*
	 * 
	 * 遍历查询hbase表数组
	 * 
	 * @tableName 表名
	 * 
	 */

	public static void getResultScann(String tableName) throws IOException {

		Scan scan = new Scan();

		ResultScanner rs = null;

		// HTable table = (HTable) tablePool.getTable(tableName);

		@SuppressWarnings("resource")
		HTable table = new HTable(conf, Bytes.toBytes(tableName));

		try {

			rs = table.getScanner(scan);
			File file = new File("/home/kfh/content.txt");
			FileWriter fw = new FileWriter(file);

			for (Result r : rs) {

				for (KeyValue kv : r.list()) {

					System.out.println("family==>" + Bytes.toString(kv.getFamily()));
					System.out.println("qualifier==>" + Bytes.toString(kv.getQualifier()));

					System.out.println("value==>" + Bytes.toString(kv.getValue()));
					fw.write(Bytes.toString(kv.getValue()));

					System.out.println("timestamp==>" + kv.getTimestamp());

				}

			}
			fw.close();
		} finally {

			rs.close();

		}

	}

	/*
	 * 
	 * 查询表中的某单一列
	 * 
	 * @tableName 表名
	 * 
	 * @rowKey rowKey
	 * 
	 */

	public static void getResultByColumn(String tableName, String rowKey,

			String familyName, String columnName) throws IOException {

		// HTable table = (HTable) tablePool.getTable(tableName);

		@SuppressWarnings("resource")
		HTable table = new HTable(conf, Bytes.toBytes(tableName));

		Get get = new Get(Bytes.toBytes(rowKey));
		// get.addColumn("f");

		get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族以及列中修饰符对应列名

		Result result = table.get(get);

		for (KeyValue kv : result.list()) {

			System.out.println("family==>" + Bytes.toString(kv.getFamily()));

			System.out.println("qualifier==>" + Bytes.toString(kv.getQualifier()));

			System.out.println("value==>" + Bytes.toString(kv.getValue()));

			System.out.println("Timestamp==>" + kv.getTimestamp());

		}

	}

	public static void getColumn(String tableName, String familyName, String columnName) throws IOException{
		Configuration conf = HBaseConfiguration.create();
		conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 21600000);
		conf.setLong(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100);
		 conf.set("hbase.zookeeper.quorum", "localhost");
		@SuppressWarnings("resource")
		HTable table = new HTable(conf, Bytes.toBytes(tableName)); // get table
		
		byte[] lastRow = null;
		int pagenum = 0;
		String siteUrl = null;

		logger.info("-------------------new Scan-------------------");
		Scan scan = new Scan();
		ResultScanner resultScanner = table.getScanner(scan); // queryAll
		System.out.println("aaaaaaa");
		Iterator<Result> results = resultScanner.iterator();

		while (results.hasNext()) {
			try {
				++pagenum;
				logger.info("已经抓取到Hbase的第" + pagenum + "条数据");
				Result result = results.next();
				byte[] contentValue = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("cnt"));
				if (contentValue == null || contentValue.length == 0) {
					continue;
				}
				ByteArrayInputStream bai = new ByteArrayInputStream(contentValue);
				String a = CharSetUtil.getStreamCharSet(bai, "utf-8");
				String content = new String(contentValue, a);
				System.out.println(content);
			} catch (Exception e) {
				logger.error(e.toString(), e);
				logger.error(siteUrl);
				continue;
			}
		}
		resultScanner.close();
		logger.info("-----------------close scanner--------------------");

	}

	/*
	 * 
	 * 查询某列数据的多个版本
	 * 
	 * @tableName 表名
	 * 
	 * @rowKey rowKey
	 * 
	 * @familyName 列族名
	 * 
	 * @columnName 列名
	 * 
	 */

	public static void getResultByVersion(String tableName, String rowKey,

			String familyName, String columnName) throws IOException {

		HTable table = (HTable) tablePool.getTable(tableName);

		Get get = new Get(Bytes.toBytes(rowKey));

		get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));

		get.setMaxVersions(5);

		Result result = table.get(get);

		for (KeyValue kv : result.list()) {

			System.out.println("family==>" + Bytes.toString(kv.getFamily()));

			System.out.println("qualifier==>" + Bytes.toString(kv.getQualifier()));

			System.out.println("value==>" + Bytes.toString(kv.getValue()));

			System.out.println("Timestamp==>" + kv.getTimestamp());

		}

	}

	/*
	 * 
	 * 删除指定的列
	 * 
	 * @tableName 表名
	 * 
	 * @rowKey rowKey
	 * 
	 * @familyName 列族名
	 * 
	 * @columnName 列名
	 * 
	 */

	public static void deleteColumn(String tableName, String rowKey,

			String falilyName, String columnName) throws IOException {

		HTable table = (HTable) tablePool.getTable(tableName);

		Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));

		deleteColumn.deleteColumns(Bytes.toBytes(falilyName), Bytes.toBytes(columnName));

		table.delete(deleteColumn);

		System.out.println(falilyName + "==>" + columnName + "is deleted!");

	}

	/*
	 * 
	 * 删除指定的列
	 * 
	 * @tableName 表名
	 * 
	 * @rowKey rowKey
	 * 
	 */

	public static void deleteAllColumn(String tableName, String rowKey) throws IOException {

		HTable table = (HTable) tablePool.getTable(tableName);

		Delete deleteAll = new Delete(Bytes.toBytes(rowKey));

		table.delete(deleteAll);

		System.out.println("all columns are deleted!");

	}

	/*
	 * 
	 * 删除表
	 * 
	 * 
	 * 
	 * @tableName 表名
	 * 
	 */

	public static void deleteTable(String tableName) throws IOException {

		HBaseAdmin admin = new HBaseAdmin(conf);

		admin.disableTable(tableName);

		admin.deleteTable(tableName);

		System.out.println(tableName + " is deleted!");

	}

	// Java Hbase main函数测试类，具体代码如下：

	public static void main(String[] args) throws Exception {

		// 创建表

		// String tableName = "yoodbblog";
		//
		// String[] family = { "realname", "address" };
		//
		// HBaseTest.creatTable(tableName, family);

		// 为表添加数据

		// String[] column1 = { "title", "author", "content" };
		//
		// String[] value1 = { "素文宅", "yoodb", "www.yoodb.com" };
		//
		// String[] column2 = { "name", "nickname" };
		//
		// String[] value2 = { "真实名称", "昵称" };
		//
		// HBaseTest.addTableData("rowkey1", "yoodbblog", column1, value1,
		// column2, value2);

		// 删除一列

		// HBaseTest.deleteColumn("yoodbblog", "rowkey1", "realname", "name");

		// 删除所有列

		// HBaseTest.deleteAllColumn("yoodbblog", "rowkey1");

		// 删除表

		// HBaseTest.deleteTable("yoodbblog");

		// 查询

		// HBaseTest.getResult("yoodbblog", "rowkey1");

		// 查询某一列的值

		// HBaseTest.getResultByColumn("ntrprsnmntlztn_webpage", "rowkey1",
		// "realname", "nickname");
		// HBaseTest.getResultByColumn("ntrprsnmntlztn_webpage",
		// "cn.aliqidian.www:http/Detail_gsml.aspx?entId=1156104",
		// "p", "c");
//		HBaseTest.getColumn(tableName, familyName, columnName);
//		HBaseTest.getColumn("ntrprsnmntlztn_webpage", "p", "c");
		// 修改某一列的值

		// HBaseTest.updateTable("yoodbblog", "rowkey1", "realname", "nickname",
		// "假昵称");

		// 遍历表数据查询

		 HBaseTest.getResultScann("enterprise_webpage");

		// 查询某列的多版本

		// HBaseTest.getResultByVersion("yoodbblog", "rowkey1", "realname",
		// "name");

	}
}
